package com.niit.kafka;

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;

import java.time.Duration;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Future;

public class TransactionProgram {

    public static void main(String[] args) {
        KafkaConsumer<String, String> consumer = createConsumer();
        KafkaProducer<String, String> producer = createProducer();

        //2. 生产者调用initTransactions初始化事务
        producer.initTransactions();
        //3.编写一个死循环。在循环中不断拉取数据，进行处理后，生产到新的主题中
        while (true){
            try{
                //生产者开启事务
                producer.beginTransaction();
                //这个Map保存了topic对应的partition的偏移量
                Map<TopicPartition, OffsetAndMetadata> offsetMap = new HashMap<>();
                // 从topic中拉取一批的数据
                // (2) 消费者拉取消息
                ConsumerRecords<String,String> consumerRecords =  consumer.poll(Duration.ofSeconds(2));
                //预处理数据
                for (ConsumerRecord<String,String> data  :consumerRecords){
                    String value = data.value();//张三,1,1980‐10‐09
                    String[] fieldArray = value.split(",");

                    String topic = data.topic();
                    int partition = data.partition();
                    long offset = data.offset();
                    System.out.println("消费数据："+fieldArray);

                    //offset + 1:offset指的是当前消费数据的偏移量，而我们希望下一次能继续从下一个消息处理
                    offsetMap.put(new TopicPartition(topic,partition),new OffsetAndMetadata(offset+1));
                    //字段的替换
                    if(fieldArray !=null && fieldArray.length>2){
                        String sexField = fieldArray[1]; // 1
                        if(sexField.equals("1")){
                            fieldArray[1] = "男";
                        }else if (sexField.equals("0")){
                            fieldArray[1] = "女";
                        }
                    }

                    //拼接字段
                    value = fieldArray[0] +","+fieldArray[1]+","+fieldArray[2];//张三,男,1980‐10‐09
                    //生产数据到新的主题 BD2_new_user
                    ProducerRecord<String,String> record = new ProducerRecord<>("BD2_new_user",value);
                    Future<RecordMetadata> future = producer.send(record);
                    try{
                        future.get();
                    }catch (Exception e){
                        e.printStackTrace();
                        producer.abortTransaction();
                    }


                }

                producer.sendOffsetsToTransaction(offsetMap,"old_user");
                producer.commitTransaction();
            }catch (Exception e){
                e.printStackTrace();
                //取消事务
                producer.abortTransaction();
            }




        }


    }
    //1.创建一个消费者，来消费BD2_old_user中的数据
    private static KafkaConsumer<String,String> createConsumer(){
        //1.1配置链接属性
        Properties props = new Properties();
        props.put("bootstrap.servers","node1:9092");
        props.put("group.id","old_user");
        //配置事物隔离级别
        props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG,"read_committed");
        //关闭自动提交，我们通过手动提供offset,通过事物来维护offset
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");
        props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("BD2_old_user"));

        return consumer;
    }
    //2.创建一个生产者，用于将转换后的数据生产到新主题当中
    private static KafkaProducer<String,String> createProducer(){
        Properties props = new Properties();
        props.put("bootstrap.servers","node1:9092");
        props.put("acks","all");
        // 开启事务必须要配置事务的ID
        props.put("transactional.id", "new_user");
        props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
        //配置生产数据的 value 进行序列化
        props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");

        KafkaProducer<String,String> producer = new KafkaProducer<String, String>(props);

        return producer;
    }

}
